package com.tdk.mybatisplus.demo.common.config;

import cn.hutool.core.util.StrUtil;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Maps;
import com.google.common.eventbus.Subscribe;
import com.tdk.mybatisplus.demo.common.entity.EventBusUser;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.web.socket.*;


import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.Map;

import static com.tdk.mybatisplus.demo.common.entity.WebSocketResultEnum.REPLY_MESSAGE;


/**
 * <p>
 * webSocket
 * </p>
 *
 * @author: taodingkai
 * @modified:
 * @since: 2022/8/24 10:25
 */
@Slf4j
public class DefaultWebSocketHandler implements WebSocketHandler, EventBusUser {
    @Autowired
    private ApplicationContext applicationContext;
    
    public static final Map<String, WebSocketSession> SESSION_MAP= Maps.newConcurrentMap();

    /**
     * <p>
     * 注册eventBus
     * </p>
     *
     * @author: taodingkai
     * @since: 2022/8/16 16:23 
     */
    @PostConstruct
    @Override
    public void register() {
        getDefaultEventBus().register(applicationContext.getBean(DefaultWebSocketHandler.class));
    }


    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        log.info("[{}]连接成功",session.getId());
        SESSION_MAP.put(session.getId(),session);
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> webSocketMessage) throws Exception {
        String text = webSocketMessage.getPayload().toString();
        webSocketMessage.getPayload().toString();
        log.info("[{}]收到消息:{}",session.getId(),text);
        sendMessage(session,REPLY_MESSAGE.toWebSocketMessage(StrUtil.format("[{}]服务端已经收到消息[{}]",session.getId(),text)));
    }

    @Override
    public void handleTransportError(WebSocketSession session, Throwable t) throws Exception {
        log.error("[{}]连接出错",session.getId(),t);
        SESSION_MAP.remove(session.getId());
    }

    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        log.info("[{}]连接关闭",session.getId());
        SESSION_MAP.remove(session.getId());
    }

    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * <p>
     * 发送消息
     * </p>
     *
     *
     * @param session
     * @param message
     * @author: taodingkai
     * @since: 2022/8/24 15:18 
     */
    public <T> void sendMessage(WebSocketSession session, com.tdk.mybatisplus.demo.common.entity.WebSocketMessage <T> message) throws JsonProcessingException {
        String messageJson = new ObjectMapper().writeValueAsString(message);
        try {
            WebSocketMessage<String> textMessage=new TextMessage(messageJson);
            session.sendMessage(textMessage);
            log.info("发送消息[{}]给[{}]成功",messageJson);
        } catch (IOException e) {
            log.error("发送消息[{}]给[{}]失败",messageJson,session.getId(),e);
        }
    }

    /**
     * <p>
     * 广播
     * </p>
     *
     *
     * @param message
     * @author: taodingkai
     * @since: 2022/8/24 15:19 
     */
    @Subscribe
    public <T> void sendMessage2AllClient(com.tdk.mybatisplus.demo.common.entity.WebSocketMessage<T> message){
        SESSION_MAP.forEach((key,session)->{
            try {
                sendMessage(session,message);
            } catch (JsonProcessingException e) {
                log.error("发送消息[{}]给[{}]失败",message,session.getId(),e);
            }
        });
    }
}
